我們前面講了 Airflow 跟 Flink 的一些基礎觀念跟開發,現在讓我們整理一下兩邊的差異。
首先,Airflow 是一個優秀的排程管理工具,它有自己的 DB 能記錄每個 job 的執行狀況跟參數,當某個 job 失敗之後,能在修正之後快速指定重啟。
但是如果兩個 task 之間有大量資料 (>2GB) 要傳輸,可能會遇到問題。如果想解決的話,又要將資料先暫存在他處,到下一個 task 再載入回來。對於流式資料的處理,也不是非常適合,畢竟它是「排程」
當遇到停機異常時,由於 Airflow 的執行狀況跟 XCom 是記錄在 metadata database 內,所以即使主機掛掉,重啟之後依然能讀取最後的執行階段,並將資料還原。
Flink 則相反,雖然它也有一個 WebUI 可以觀察目前狀況,也有提供 API 確認現在 job 的狀態,但它的排程僅能依賴外部,例如 Linux crontab。不過他是 Java 開發,所以在效能跟 threads 使用上會優於 Python。
當遇到停機異常時,Flink 如果連主要的 master 都掛掉,那執行中的 job 會消失。即使你有設定定期存 Checkpoint,你還是需要找出對應的 checkpoint 存檔點,要從一堆無意義的 hash 資料夾中尋回,存在一定難度。
圖裡就是我某個主機長期執行之後,存下來的 checkpoint 目錄。
所以我建議,如果真的是 streaming 的 job,就讓它在 Flink 上長期掛著執行。再透過 Airflow 呼叫 Flink API 去監控狀態,記錄 jobID, parameters 等參數存在 XCom 內,必要時可以重啟。
而 batch 的工作,如果簡單的話,可以寫在 AIrflow 內,複雜的就在 Flink 開發後,透過 Airflow 排程呼叫。
我們可以寫一個 Airflow 的 operator 來簡化這件事
from airflow.models import BaseOperator
from airflow.utils.decorators import apply_defaults
import subprocess
class RunFlinkJobOperator(BaseOperator):
"""
Custom Airflow Operator to run a Flink job using a JAR file.
"""
@apply_defaults
def __init__(
self,
jar_path,
flink_home,
flink_command="run",
*args, **kwargs
):
"""
Initialize the operator.
:param jar_path: Path to the Flink JAR file.
:param flink_home: Path to the Flink installation directory.
:param flink_command: Flink command to run (default is 'run').
"""
super().__init__(*args, **kwargs)
self.jar_path = jar_path
self.flink_home = flink_home
self.flink_command = flink_command
def execute(self, context):
"""
Execute the Flink job using the specified JAR file.
:param context: Airflow execution context.
"""
# Construct the Flink command
flink_cmd = f"{self.flink_home}/bin/flink {self.flink_command} {self.jar_path}"
# Run the Flink job
try:
result = subprocess.run(flink_cmd, shell=True, check=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
self.log.info(result.stdout)
self.log.info(result.stderr)
self.log.info("Flink job completed successfully.")
except subprocess.CalledProcessError as e:
self.log.error(f"Flink job failed with error: {e}")
raise e
甚至連 jar 檔都可以定期的透過 airflow 從 maven 下載更新,但這部份先略過不提。
希望這樣的組合能讓你的 ETL 更加流暢,讓適合的工具來協助我們。